package sub

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"github.com/spf13/cobra"
	"os"
	"time"
)

var D *Data

type Data struct {
	Ip      string
	Topic   string
	ConnNum int
}

var Sub = &cobra.Command{
	Use:   "sub",
	Short: "订阅",
	Example: `
	# 开启1个tcp连接的订阅，订阅主题为 qtmd
	mqctl sub -c 1 -t qtmd -i 192.168.40.112:1883 
`,
	Run: func(cmd *cobra.Command, args []string) {
		if D.ConnNum == 0 {
			D.ConnNum = 1
		}

		for i := 0; i < D.ConnNum; i++ {
			go work()
		}

		for {
			time.Sleep(time.Second)
		}
	},
}

func init() {
	D = &Data{}
	Sub.Flags().StringVarP(&D.Ip, "ip", "i", "", "订阅的地址")
	Sub.Flags().StringVarP(&D.Topic, "topic", "t", "", "订阅的主题")
	Sub.Flags().IntVarP(&D.ConnNum, "cn", "c", 0, "开启的订阅数量")
}

var record int

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Println(time.Now().String())
	fmt.Println("topic： ", msg.Topic())
	fmt.Println("收到数据长度: ", len(msg.Payload()))
	record++
	fmt.Println(record)
}

func work() {
	opts := mqtt.NewClientOptions().AddBroker("tcp://" + D.Ip)
	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	go func() {
		if token := c.Subscribe(D.Topic, 0, f); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			os.Exit(1)
		}
	}()

	for {
		time.Sleep(time.Second)
	}
}
